package proccess

import (
	"fmt"
	. "gitee.com/youbeiwuhuan/go-xxljob-executor/biz/model"
	. "gitee.com/youbeiwuhuan/go-xxljob-executor/global"
	"gitee.com/youbeiwuhuan/go-xxljob-executor/utils/files"
	"gitee.com/youbeiwuhuan/go-xxljob-executor/utils/jsons"
	"go.uber.org/zap"
	"io"
	"io/ioutil"
	"os"
	"strconv"
	"strings"
	"sync"
	"time"
)

const (
	DEFAULT_TRIGGER_QUEUE_SIZE = 50
	RETRY_BEAT_TIMEOUT         = 30
)

// 触发任务回调处理器
type TriggerCallbackProccessor struct {
	stopedTrigger        chan int8
	stopedRetry          chan int8
	callBackQueue        chan HandleCallbackParam
	failCallbackFilePath string

	wg sync.WaitGroup
}

func NewTriggerCallbackProccessor() *TriggerCallbackProccessor {

	return &TriggerCallbackProccessor{
		stopedTrigger:        make(chan int8),
		stopedRetry:          make(chan int8),
		callBackQueue:        make(chan HandleCallbackParam, DEFAULT_TRIGGER_QUEUE_SIZE),
		failCallbackFilePath: XxlLogPath + "/callbacklog",
		wg:                   sync.WaitGroup{},
	}

}

func (t *TriggerCallbackProccessor) PushTriggerCallback(callbackParam HandleCallbackParam) {
	fmt.Println("-----------------------------", callbackParam.String())
	t.callBackQueue <- callbackParam
}

func (t *TriggerCallbackProccessor) Start() {
	files.CreateDirIfNotExists(t.failCallbackFilePath)

	go func() {

	LOOP_CALLBACK:
		for {

			select {
			case callback := <-t.callBackQueue:
				t.doCallBack(callback)
			case <-t.stopedTrigger:
				break LOOP_CALLBACK
			default:
			}

		}
	}()

	go func() {
	LOOP_RETRY:
		for {
			select {
			case <-t.stopedRetry:
				break LOOP_RETRY
			default:
				t.retryFailCallbackFile()
			}

			time.Sleep(time.Duration(RETRY_BEAT_TIMEOUT) * time.Second)

		}
	}()

}

func (t *TriggerCallbackProccessor) Stop() {

	defer func() {
		close(t.stopedTrigger)
		close(t.stopedRetry)
		close(t.callBackQueue)
	}()

	t.stopedTrigger <- 0
	t.stopedRetry <- 0

}

func (t *TriggerCallbackProccessor) doCallBack(callback HandleCallbackParam) {
	callbackSuccess := false

	for _, adminBiz := range AdminBizList {
		rt := adminBiz.Callback([]HandleCallbackParam{callback})
		if SUCCESS_CODE == rt.Code {
			t.callbackLog(callback, "<br>----------- xxl-job job callback finish.")
			callbackSuccess = true
			break
		} else {
			t.callbackLog(callback, "<br>----------- xxl-job job callback fail, callbackResult:"+rt.String())
		}
	}

	if !callbackSuccess {
		t.appendFailCallbackFile(callback)
	}

}

func (t *TriggerCallbackProccessor) getFailCallbackFileName(failCallbackFilePath string) string {
	return strings.Replace(failCallbackFilePath+"/xxl-job-callback-{x}.log", "{x}", strconv.Itoa(time.Now().Nanosecond()), 1)
}

func (t *TriggerCallbackProccessor) appendFailCallbackFile(callback HandleCallbackParam) {
	f, err := os.Create(t.getFailCallbackFileName(t.failCallbackFilePath))
	if nil != err {
		Logger.Error("create file error", zap.Any("err", err))
	}
	defer f.Close()

	io.WriteString(f, jsons.ToJsonString(callback))
}

func (t *TriggerCallbackProccessor) callbackLog(callback HandleCallbackParam, logContent string) {

}

func (t *TriggerCallbackProccessor) retryFailCallbackFile() {
	files, err := ioutil.ReadDir(t.failCallbackFilePath)
	if nil != err {
		Logger.Error("", zap.Any("error", err))
		return
	}

	if nil == files || 0 == len(files) {
		return
	}

	for _, f := range files {
		if f.IsDir() {
			continue
		}

		fpath := t.failCallbackFilePath + "/" + f.Name()

		bytes, err := ioutil.ReadFile(fpath)
		if nil != err {
			continue
		}

		callback := HandleCallbackParam{}
		jsons.JsonStringToObj(string(bytes), &callback)

		os.Remove(fpath)
		t.doCallBack(callback)
	}

}
